use crate::{
    metrics::{self, register_process_result_metrics},
    network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor},
    service::NetworkMessage,
    sync::SyncMessage,
};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::store::Error;
use beacon_chain::{
    AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError,
    GossipVerifiedBlock, NotifyExecutionLayer,
    attestation_verification::{self, Error as AttnError, VerifiedAttestation},
    data_availability_checker::AvailabilityCheckErrorCategory,
    light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
    light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError,
    observed_operations::ObservationOutcome,
    sync_committee_verification::{self, Error as SyncCommitteeError},
    validator_monitor::{get_block_delay_ms, get_slot_delay_ms},
};
use beacon_processor::{Work, WorkEvent};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use lighthouse_tracing::{
    SPAN_PROCESS_GOSSIP_BLOB, SPAN_PROCESS_GOSSIP_BLOCK, SPAN_PROCESS_GOSSIP_DATA_COLUMN,
};
use logging::crit;
use operation_pool::ReceivedPreCapella;
use slot_clock::SlotClock;
use ssz::Encode;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
use types::{
    Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar,
    DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate,
    LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
    SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SingleAttestation,
    Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, beacon_block::BlockImportSource,
};

use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
use beacon_processor::{
    DuplicateCache, GossipAggregatePackage, GossipAttestationBatch,
    work_reprocessing_queue::{
        QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
        ReprocessQueueMessage,
    },
};

/// Set to `true` to introduce stricter penalties for peers who send some types of late consensus
/// messages.
const STRICT_LATE_MESSAGE_PENALTIES: bool = false;

/// An attestation that has been validated by the `BeaconChain`.
///
/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to
/// construct this from components which have not passed `BeaconChain` validation.
struct VerifiedUnaggregate<T: BeaconChainTypes> {
    attestation: Box<Attestation<T::EthSpec>>,
    indexed_attestation: IndexedAttestation<T::EthSpec>,
}

/// This implementation allows `Self` to be imported to fork choice and other functions on the
/// `BeaconChain`.
impl<T: BeaconChainTypes> VerifiedAttestation<T> for VerifiedUnaggregate<T> {
    fn attestation(&self) -> AttestationRef<'_, T::EthSpec> {
        self.attestation.to_ref()
    }

    fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> {
        &self.indexed_attestation
    }

    fn into_attestation_and_indices(self) -> (Attestation<T::EthSpec>, Vec<u64>) {
        let attestation = *self.attestation;
        let attesting_indices = self.indexed_attestation.attesting_indices_to_vec();
        (attestation, attesting_indices)
    }
}

/// An attestation that failed validation by the `BeaconChain`.
struct RejectedUnaggregate {
    attestation: Box<SingleAttestation>,
    error: AttnError,
}

/// An aggregate that has been validated by the `BeaconChain`.
///
/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to
/// construct this from components which have not passed `BeaconChain` validation.
struct VerifiedAggregate<T: BeaconChainTypes> {
    signed_aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
    indexed_attestation: IndexedAttestation<T::EthSpec>,
}

/// This implementation allows `Self` to be imported to fork choice and other functions on the
/// `BeaconChain`.
impl<T: BeaconChainTypes> VerifiedAttestation<T> for VerifiedAggregate<T> {
    fn attestation(&self) -> AttestationRef<'_, T::EthSpec> {
        self.signed_aggregate.message().aggregate()
    }

    fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> {
        &self.indexed_attestation
    }

    /// Efficient clone-free implementation that moves out of the `Box`.
    fn into_attestation_and_indices(self) -> (Attestation<T::EthSpec>, Vec<u64>) {
        let attestation = self.signed_aggregate.into_attestation();
        let attesting_indices = self.indexed_attestation.attesting_indices_to_vec();
        (attestation, attesting_indices)
    }
}

/// An attestation that failed validation by the `BeaconChain`.
struct RejectedAggregate<E: EthSpec> {
    signed_aggregate: Box<SignedAggregateAndProof<E>>,
    error: AttnError,
}

/// Data for an aggregated or unaggregated attestation that failed verification.
enum FailedAtt<E: EthSpec> {
    Unaggregate {
        attestation: Box<SingleAttestation>,
        subnet_id: SubnetId,
        should_import: bool,
        seen_timestamp: Duration,
    },
    Aggregate {
        attestation: Box<SignedAggregateAndProof<E>>,
        seen_timestamp: Duration,
    },
}

impl<E: EthSpec> FailedAtt<E> {
    pub fn beacon_block_root(&self) -> &Hash256 {
        &self.attestation_data().beacon_block_root
    }

    pub fn kind(&self) -> &'static str {
        match self {
            FailedAtt::Unaggregate { .. } => "unaggregated",
            FailedAtt::Aggregate { .. } => "aggregated",
        }
    }

    pub fn attestation_data(&self) -> &AttestationData {
        match self {
            FailedAtt::Unaggregate { attestation, .. } => &attestation.data,
            FailedAtt::Aggregate { attestation, .. } => attestation.message().aggregate().data(),
        }
    }
}

impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
    /* Auxiliary functions */

    /// Penalizes a peer for misbehaviour.
    fn gossip_penalize_peer(&self, peer_id: PeerId, action: PeerAction, msg: &'static str) {
        self.send_network_message(NetworkMessage::ReportPeer {
            peer_id,
            action,
            source: ReportSource::Gossipsub,
            msg,
        })
    }

    /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on
    /// the gossip network.
    ///
    /// Creates a log if there is an internal error.
    /// Propagates the result of the validation for the given message to the network. If the result
    /// is valid the message gets forwarded to other peers.
    pub(crate) fn propagate_validation_result(
        &self,
        message_id: MessageId,
        propagation_source: PeerId,
        validation_result: MessageAcceptance,
    ) {
        self.send_network_message(NetworkMessage::ValidationResult {
            propagation_source,
            message_id,
            validation_result,
        })
    }

    /* Processing functions */

    /// Process the unaggregated attestation received from the gossip network and:
    ///
    /// - If it passes gossip propagation criteria, tell the network thread to forward it.
    /// - Attempt to apply it to fork choice.
    /// - Attempt to add it to the naive aggregation pool.
    ///
    /// Raises a log if there are errors.
    #[allow(clippy::too_many_arguments)]
    pub fn process_gossip_attestation(
        self: Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        attestation: Box<SingleAttestation>,
        subnet_id: SubnetId,
        should_import: bool,
        allow_reprocess: bool,
        seen_timestamp: Duration,
    ) {
        let result = match self
            .chain
            .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id))
        {
            Ok(verified_attestation) => {
                let attestation =
                    Box::new(verified_attestation.attestation().clone_as_attestation());
                Ok(VerifiedUnaggregate {
                    indexed_attestation: verified_attestation.into_indexed_attestation(),
                    attestation,
                })
            }
            Err(error) => Err(RejectedUnaggregate { attestation, error }),
        };

        self.process_gossip_attestation_result(
            result,
            message_id,
            peer_id,
            subnet_id,
            allow_reprocess,
            should_import,
            seen_timestamp,
        );
    }

    pub fn process_gossip_attestation_batch(
        self: Arc<Self>,
        packages: GossipAttestationBatch,
        allow_reprocess: bool,
    ) {
        let attestations_and_subnets = packages
            .iter()
            .map(|package| (package.attestation.as_ref(), Some(package.subnet_id)));

        let results = match self
            .chain
            .batch_verify_unaggregated_attestations_for_gossip(attestations_and_subnets)
        {
            Ok(results) => results,
            Err(e) => {
                error!(
                    error = ?e,
                    "Batch unagg. attn verification failed"
                );
                return;
            }
        };

        // Sanity check.
        if results.len() != packages.len() {
            // The log is `crit` since in this scenario we might be penalizing/rewarding the wrong
            // peer.
            crit!(
                results = results.len(),
                packages = packages.len(),
                "Batch attestation result mismatch"
            )
        }

        // Map the results into a new `Vec` so that `results` no longer holds a reference to
        // `packages`.
        #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker.
        let results = results
            .into_iter()
            .map(|result| {
                result.map(|verified| {
                    let attestation = verified.attestation().clone_as_attestation();
                    (verified.into_indexed_attestation(), attestation)
                })
            })
            .collect::<Vec<_>>();

        for (result, package) in results.into_iter().zip(packages.into_iter()) {
            let result = match result {
                Ok((indexed_attestation, attestation)) => Ok(VerifiedUnaggregate {
                    indexed_attestation,
                    attestation: Box::new(attestation),
                }),
                Err(error) => Err(RejectedUnaggregate {
                    attestation: package.attestation,
                    error,
                }),
            };

            self.process_gossip_attestation_result(
                result,
                package.message_id,
                package.peer_id,
                package.subnet_id,
                allow_reprocess,
                package.should_import,
                package.seen_timestamp,
            );
        }
    }

    // Clippy warning is is ignored since the arguments are all of a different type (i.e., they
    // cant' be mixed-up) and creating a struct would result in more complexity.
    #[allow(clippy::too_many_arguments)]
    fn process_gossip_attestation_result(
        self: &Arc<Self>,
        result: Result<VerifiedUnaggregate<T>, RejectedUnaggregate>,
        message_id: MessageId,
        peer_id: PeerId,
        subnet_id: SubnetId,
        allow_reprocess: bool,
        should_import: bool,
        seen_timestamp: Duration,
    ) {
        match result {
            Ok(verified_attestation) => {
                let indexed_attestation = &verified_attestation.indexed_attestation;
                let beacon_block_root = indexed_attestation.data().beacon_block_root;

                // Register the attestation with any monitored validators.
                self.chain
                    .validator_monitor
                    .read()
                    .register_gossip_unaggregated_attestation(
                        seen_timestamp,
                        indexed_attestation,
                        &self.chain.slot_clock,
                    );

                // If the attestation is still timely, propagate it.
                self.propagate_attestation_if_timely(
                    verified_attestation.attestation(),
                    message_id,
                    peer_id,
                );

                if !should_import {
                    return;
                }

                metrics::inc_counter(
                    &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
                );

                if let Err(e) = self
                    .chain
                    .apply_attestation_to_fork_choice(&verified_attestation)
                {
                    match e {
                        BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(
                            e,
                        )) => {
                            debug!(
                                reason = ?e,
                                %peer_id,
                                ?beacon_block_root,
                                "Attestation invalid for fork choice"
                            )
                        }
                        e => error!(
                            reason = ?e,
                            %peer_id,
                            ?beacon_block_root,
                            "Error applying attestation to fork choice"
                        ),
                    }
                }

                if let Err(e) = self
                    .chain
                    .add_to_naive_aggregation_pool(&verified_attestation)
                {
                    debug!(
                        reason = ?e,
                        %peer_id,
                        ?beacon_block_root,
                        "Attestation invalid for agg pool"
                    )
                }

                metrics::inc_counter(
                    &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
                );
            }
            Err(RejectedUnaggregate { attestation, error }) => {
                self.handle_attestation_verification_failure(
                    peer_id,
                    message_id,
                    FailedAtt::Unaggregate {
                        attestation,
                        subnet_id,
                        should_import,
                        seen_timestamp,
                    },
                    allow_reprocess,
                    error,
                    seen_timestamp,
                );
            }
        }
    }

    /// Process the aggregated attestation received from the gossip network and:
    ///
    /// - If it passes gossip propagation criteria, tell the network thread to forward it.
    /// - Attempt to apply it to fork choice.
    /// - Attempt to add it to the block inclusion pool.
    ///
    /// Raises a log if there are errors.
    pub fn process_gossip_aggregate(
        self: Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
        allow_reprocess: bool,
        seen_timestamp: Duration,
    ) {
        let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root;

        let result = match self
            .chain
            .verify_aggregated_attestation_for_gossip(&aggregate)
        {
            Ok(verified_aggregate) => Ok(VerifiedAggregate {
                indexed_attestation: verified_aggregate.into_indexed_attestation(),
                signed_aggregate: aggregate,
            }),
            Err(error) => Err(RejectedAggregate {
                signed_aggregate: aggregate,
                error,
            }),
        };

        self.process_gossip_aggregate_result(
            result,
            beacon_block_root,
            message_id,
            peer_id,
            allow_reprocess,
            seen_timestamp,
        );
    }

    pub fn process_gossip_aggregate_batch(
        self: Arc<Self>,
        packages: Vec<GossipAggregatePackage<T::EthSpec>>,
        allow_reprocess: bool,
    ) {
        let aggregates = packages.iter().map(|package| package.aggregate.as_ref());

        let results = match self
            .chain
            .batch_verify_aggregated_attestations_for_gossip(aggregates)
        {
            Ok(results) => results,
            Err(e) => {
                error!(
                    error = ?e,
                    "Batch agg. attn verification failed"
                );
                return;
            }
        };

        // Sanity check.
        if results.len() != packages.len() {
            // The log is `crit` since in this scenario we might be penalizing/rewarding the wrong
            // peer.
            crit!(
                results = results.len(),
                packages = packages.len(),
                "Batch agg. attestation result mismatch"
            )
        }

        // Map the results into a new `Vec` so that `results` no longer holds a reference to
        // `packages`.
        #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker.
        let results = results
            .into_iter()
            .map(|result| result.map(|verified| verified.into_indexed_attestation()))
            .collect::<Vec<_>>();

        for (result, package) in results.into_iter().zip(packages.into_iter()) {
            let result = match result {
                Ok(indexed_attestation) => Ok(VerifiedAggregate {
                    indexed_attestation,
                    signed_aggregate: package.aggregate,
                }),
                Err(error) => Err(RejectedAggregate {
                    signed_aggregate: package.aggregate,
                    error,
                }),
            };

            self.process_gossip_aggregate_result(
                result,
                package.beacon_block_root,
                package.message_id,
                package.peer_id,
                allow_reprocess,
                package.seen_timestamp,
            );
        }
    }

    fn process_gossip_aggregate_result(
        self: &Arc<Self>,
        result: Result<VerifiedAggregate<T>, RejectedAggregate<T::EthSpec>>,
        beacon_block_root: Hash256,
        message_id: MessageId,
        peer_id: PeerId,
        allow_reprocess: bool,
        seen_timestamp: Duration,
    ) {
        match result {
            Ok(verified_aggregate) => {
                let aggregate = &verified_aggregate.signed_aggregate;
                let indexed_attestation = &verified_aggregate.indexed_attestation;

                // If the attestation is still timely, propagate it.
                self.propagate_attestation_if_timely(
                    verified_aggregate.attestation(),
                    message_id,
                    peer_id,
                );

                // Register the attestation with any monitored validators.
                self.chain
                    .validator_monitor
                    .read()
                    .register_gossip_aggregated_attestation(
                        seen_timestamp,
                        aggregate,
                        indexed_attestation,
                        &self.chain.slot_clock,
                    );

                metrics::inc_counter(
                    &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
                );

                if let Err(e) = self
                    .chain
                    .apply_attestation_to_fork_choice(&verified_aggregate)
                {
                    match e {
                        BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(
                            e,
                        )) => {
                            debug!(
                                reason = ?e,
                                %peer_id,
                                ?beacon_block_root,
                                "Aggregate invalid for fork choice"
                            )
                        }
                        e => error!(
                            reason = ?e,
                            %peer_id,
                            ?beacon_block_root,
                            "Error applying aggregate to fork choice"
                        ),
                    }
                }

                if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_aggregate) {
                    debug!(
                        reason = ?e,
                        %peer_id,
                        ?beacon_block_root,
                        "Attestation invalid for op pool"
                    )
                }

                metrics::inc_counter(
                    &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL,
                );
            }
            Err(RejectedAggregate {
                signed_aggregate,
                error,
            }) => {
                // Report the failure to gossipsub
                self.handle_attestation_verification_failure(
                    peer_id,
                    message_id,
                    FailedAtt::Aggregate {
                        attestation: signed_aggregate,
                        seen_timestamp,
                    },
                    allow_reprocess,
                    error,
                    seen_timestamp,
                );
            }
        }
    }

    #[instrument(
        name = SPAN_PROCESS_GOSSIP_DATA_COLUMN,
        parent = None,
        level = "debug",
        skip_all,
        fields(slot = %column_sidecar.slot(), block_root = ?column_sidecar.block_root(), index = column_sidecar.index),
    )]
    pub async fn process_gossip_data_column_sidecar(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        subnet_id: DataColumnSubnetId,
        column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
        seen_duration: Duration,
    ) {
        let slot = column_sidecar.slot();
        let block_root = column_sidecar.block_root();
        let index = column_sidecar.index;
        let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
        // Log metrics to track delay from other nodes on the network.
        metrics::observe_duration(
            &metrics::BEACON_DATA_COLUMN_GOSSIP_SLOT_START_DELAY_TIME,
            delay,
        );
        match self
            .chain
            .verify_data_column_sidecar_for_gossip(column_sidecar.clone(), subnet_id)
        {
            Ok(gossip_verified_data_column) => {
                metrics::inc_counter(
                    &metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL,
                );

                debug!(
                    %slot,
                    %block_root,
                    %index,
                    "Successfully verified gossip data column sidecar"
                );

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

                // Log metrics to keep track of propagation delay times.
                if let Some(duration) = SystemTime::now()
                    .duration_since(UNIX_EPOCH)
                    .ok()
                    .and_then(|now| now.checked_sub(seen_duration))
                {
                    metrics::observe_duration(
                        &metrics::BEACON_DATA_COLUMN_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME,
                        duration,
                    );
                }

                self.process_gossip_verified_data_column(
                    peer_id,
                    gossip_verified_data_column,
                    seen_duration,
                )
                .await
            }
            Err(err) => {
                match err {
                    GossipDataColumnError::PriorKnownUnpublished => {
                        debug!(
                            %slot,
                            %block_root,
                            %index,
                            "Gossip data column already processed via the EL. Accepting the column sidecar without re-processing."
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Accept,
                        );
                    }
                    GossipDataColumnError::ParentUnknown { parent_root } => {
                        debug!(
                            action = "requesting parent",
                            %block_root,
                            %parent_root,
                            "Unknown parent hash for column"
                        );
                        self.send_sync_message(SyncMessage::UnknownParentDataColumn(
                            peer_id,
                            column_sidecar,
                        ));
                    }
                    GossipDataColumnError::PubkeyCacheTimeout
                    | GossipDataColumnError::BeaconChainError(_) => {
                        crit!(
                            error = ?err,
                            "Internal error when verifying column sidecar"
                        )
                    }
                    GossipDataColumnError::ProposalSignatureInvalid
                    | GossipDataColumnError::UnknownValidator(_)
                    | GossipDataColumnError::ProposerIndexMismatch { .. }
                    | GossipDataColumnError::IsNotLaterThanParent { .. }
                    | GossipDataColumnError::InvalidSubnetId { .. }
                    | GossipDataColumnError::InvalidInclusionProof
                    | GossipDataColumnError::InvalidKzgProof { .. }
                    | GossipDataColumnError::UnexpectedDataColumn
                    | GossipDataColumnError::InvalidColumnIndex(_)
                    | GossipDataColumnError::MaxBlobsPerBlockExceeded { .. }
                    | GossipDataColumnError::InconsistentCommitmentsLength { .. }
                    | GossipDataColumnError::InconsistentProofsLength { .. }
                    | GossipDataColumnError::NotFinalizedDescendant { .. } => {
                        debug!(
                            error = ?err,
                            %slot,
                            %block_root,
                            %index,
                            "Could not verify column sidecar for gossip. Rejecting the column sidecar"
                        );
                        // Prevent recurring behaviour by penalizing the peer slightly.
                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::LowToleranceError,
                            "gossip_data_column_low",
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Reject,
                        );
                    }
                    GossipDataColumnError::PriorKnown { .. } => {
                        // Data column is available via either the EL or reconstruction.
                        // Do not penalise the peer.
                        // Gossip filter should filter any duplicates received after this.
                        debug!(
                            %slot,
                            %block_root,
                            %index,
                            "Received already available column sidecar. Ignoring the column sidecar"
                        )
                    }
                    GossipDataColumnError::FutureSlot { .. }
                    | GossipDataColumnError::PastFinalizedSlot { .. } => {
                        debug!(
                            error = ?err,
                            %slot,
                            %block_root,
                            %index,
                            "Could not verify column sidecar for gossip. Ignoring the column sidecar"
                        );
                        // Prevent recurring behaviour by penalizing the peer slightly.
                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::HighToleranceError,
                            "gossip_data_column_high",
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Ignore,
                        );
                    }
                }
            }
        }
    }

    #[allow(clippy::too_many_arguments)]
    #[instrument(
        name = SPAN_PROCESS_GOSSIP_BLOB,
        parent = None,
        level = "debug",
        skip_all,
        fields(
            slot = ?blob_sidecar.slot(),
            block_root = ?blob_sidecar.block_root(),
            index = blob_sidecar.index),
    )]
    pub async fn process_gossip_blob(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        _peer_client: Client,
        blob_index: u64,
        blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
        seen_duration: Duration,
    ) {
        let slot = blob_sidecar.slot();
        let root = blob_sidecar.block_root();
        let index = blob_sidecar.index;
        let commitment = blob_sidecar.kzg_commitment;
        let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
        // Log metrics to track delay from other nodes on the network.
        metrics::set_gauge(&metrics::BEACON_BLOB_DELAY_GOSSIP, delay.as_millis() as i64);
        match self
            .chain
            .verify_blob_sidecar_for_gossip(blob_sidecar.clone(), blob_index)
        {
            Ok(gossip_verified_blob) => {
                metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOB_VERIFIED_TOTAL);

                if delay >= self.chain.slot_clock.unagg_attestation_production_delay() {
                    metrics::inc_counter(&metrics::BEACON_BLOB_GOSSIP_ARRIVED_LATE_TOTAL);
                    debug!(
                        block_root = ?gossip_verified_blob.block_root(),
                        proposer_index = gossip_verified_blob.block_proposer_index(),
                        slot = %gossip_verified_blob.slot(),
                        delay = ?delay,
                        commitment = %gossip_verified_blob.kzg_commitment(),
                        "Gossip blob arrived late"
                    );
                }

                debug!(
                    %slot,
                    %root,
                    %index,
                    commitment = %gossip_verified_blob.kzg_commitment(),
                    "Successfully verified gossip blob"
                );

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

                // Log metrics to keep track of propagation delay times.
                if let Some(duration) = SystemTime::now()
                    .duration_since(UNIX_EPOCH)
                    .ok()
                    .and_then(|now| now.checked_sub(seen_duration))
                {
                    metrics::set_gauge(
                        &metrics::BEACON_BLOB_DELAY_GOSSIP_VERIFICATION,
                        duration.as_millis() as i64,
                    );
                }
                self.process_gossip_verified_blob(peer_id, gossip_verified_blob, seen_duration)
                    .await
            }
            Err(err) => {
                match err {
                    GossipBlobError::ParentUnknown { parent_root } => {
                        debug!(
                            action = "requesting parent",
                            block_root = %root,
                            parent_root = %parent_root,
                            %commitment,
                            "Unknown parent hash for blob"
                        );
                        self.send_sync_message(SyncMessage::UnknownParentBlob(
                            peer_id,
                            blob_sidecar,
                        ));
                    }
                    GossipBlobError::PubkeyCacheTimeout | GossipBlobError::BeaconChainError(_) => {
                        crit!(
                            error = ?err,
                            "Internal error when verifying blob sidecar"
                        )
                    }
                    GossipBlobError::ProposalSignatureInvalid
                    | GossipBlobError::UnknownValidator(_)
                    | GossipBlobError::ProposerIndexMismatch { .. }
                    | GossipBlobError::BlobIsNotLaterThanParent { .. }
                    | GossipBlobError::InvalidSubnet { .. }
                    | GossipBlobError::InvalidInclusionProof
                    | GossipBlobError::KzgError(_)
                    | GossipBlobError::NotFinalizedDescendant { .. } => {
                        warn!(
                            error = ?err,
                            %slot,
                            %root,
                            %index,
                            %commitment,
                            "Could not verify blob sidecar for gossip. Rejecting the blob sidecar"
                        );
                        // Prevent recurring behaviour by penalizing the peer.
                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::LowToleranceError,
                            "gossip_blob_low",
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Reject,
                        );
                    }
                    GossipBlobError::RepeatBlob { .. } => {
                        // We may have received the blob from the EL. Do not penalise the peer.
                        // Gossip filter should filter any duplicates received after this.
                        debug!(
                            %slot,
                            %root,
                            %index,
                            "Received already available blob sidecar. Ignoring the blob sidecar"
                        )
                    }
                    GossipBlobError::FutureSlot { .. } => {
                        debug!(
                            error = ?err,
                            %slot,
                            %root,
                            %index,
                            %commitment,
                            "Could not verify blob sidecar for gossip. Ignoring the blob sidecar"
                        );
                        // Prevent recurring behaviour by penalizing the peer slightly.
                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::HighToleranceError,
                            "gossip_blob_high",
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Ignore,
                        );
                    }
                    GossipBlobError::PastFinalizedSlot { .. } => {
                        debug!(
                            error = ?err,
                            %slot,
                            %root,
                            %index,
                            %commitment,
                            "Could not verify blob sidecar for gossip. Ignoring the blob sidecar"
                        );
                        // Prevent recurring behaviour by penalizing the peer. A low-tolerance
                        // error is fine because there's no reason for peers to be propagating old
                        // blobs on gossip, even if their view of finality is lagging.
                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::LowToleranceError,
                            "gossip_blob_low",
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Ignore,
                        );
                    }
                }
            }
        }
    }

    async fn process_gossip_verified_blob(
        self: &Arc<Self>,
        peer_id: PeerId,
        verified_blob: GossipVerifiedBlob<T>,
        _seen_duration: Duration,
    ) {
        let processing_start_time = Instant::now();
        let block_root = verified_blob.block_root();
        let blob_slot = verified_blob.slot();
        let blob_index = verified_blob.id().index;

        let result = self.chain.process_gossip_blob(verified_blob).await;
        register_process_result_metrics(&result, metrics::BlockSource::Gossip, "blob");

        match &result {
            Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
                debug!(
                    %block_root,
                    "Gossipsub blob processed - imported fully available block"
                );
                self.chain.recompute_head_at_current_slot().await;

                metrics::set_gauge(
                    &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
                    processing_start_time.elapsed().as_millis() as i64,
                );
            }
            Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
                debug!(
                    %slot,
                    %blob_index,
                    %block_root,
                    "Processed gossip blob - waiting for other components"
                );
            }
            Err(BlockError::DuplicateFullyImported(_)) => {
                debug!(
                    ?block_root,
                    blob_index, "Ignoring gossip blob already imported"
                );
            }
            Err(err) => {
                debug!(
                    outcome = ?err,
                    ?block_root,
                    %blob_slot,
                    blob_index,
                    "Invalid gossip blob"
                );
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::MidToleranceError,
                    "bad_gossip_blob_ssz",
                );
            }
        }

        // If a block is in the da_checker, sync maybe awaiting for an event when block is finally
        // imported. A block can become imported both after processing a block or blob. If a
        // importing a block results in `Imported`, notify. Do not notify of blob errors.
        if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
            self.send_sync_message(SyncMessage::GossipBlockProcessResult {
                block_root,
                imported: true,
            });
        }
    }

    async fn process_gossip_verified_data_column(
        self: &Arc<Self>,
        peer_id: PeerId,
        verified_data_column: GossipVerifiedDataColumn<T>,
        // This value is not used presently, but it might come in handy for debugging.
        _seen_duration: Duration,
    ) {
        let processing_start_time = Instant::now();
        let block_root = verified_data_column.block_root();
        let data_column_slot = verified_data_column.slot();
        let data_column_index = verified_data_column.index();

        let result = self
            .chain
            .process_gossip_data_columns(vec![verified_data_column], || Ok(()))
            .await;
        register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column");

        match &result {
            Ok(availability) => match availability {
                AvailabilityProcessingStatus::Imported(block_root) => {
                    debug!(
                        %block_root,
                        "Gossipsub data column processed, imported fully available block"
                    );
                    self.chain.recompute_head_at_current_slot().await;

                    metrics::set_gauge(
                        &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
                        processing_start_time.elapsed().as_millis() as i64,
                    );
                }
                AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
                    trace!(
                        %slot,
                        %data_column_index,
                        %block_root,
                        "Processed data column, waiting for other components"
                    );

                    if self
                        .chain
                        .data_availability_checker
                        .custody_context()
                        .should_attempt_reconstruction(
                            slot.epoch(T::EthSpec::slots_per_epoch()),
                            &self.chain.spec,
                        )
                    {
                        // Instead of triggering reconstruction immediately, schedule it to be run. If
                        // another column arrives, it either completes availability or pushes
                        // reconstruction back a bit.
                        let cloned_self = Arc::clone(self);
                        let block_root = *block_root;

                        if self
                            .beacon_processor_send
                            .try_send(WorkEvent {
                                drop_during_sync: false,
                                work: Work::Reprocess(
                                    ReprocessQueueMessage::DelayColumnReconstruction(
                                        QueuedColumnReconstruction {
                                            block_root,
                                            slot: *slot,
                                            process_fn: Box::pin(async move {
                                                cloned_self
                                                    .attempt_data_column_reconstruction(block_root)
                                                    .await;
                                            }),
                                        },
                                    ),
                                ),
                            })
                            .is_err()
                        {
                            warn!("Unable to send reconstruction to reprocessing");
                        }
                    }
                }
            },
            Err(BlockError::DuplicateFullyImported(_)) => {
                debug!(
                    ?block_root,
                    data_column_index, "Ignoring gossip column already imported"
                );
            }
            Err(err) => {
                debug!(
                    outcome = ?err,
                    ?block_root,
                    block_slot =  %data_column_slot,
                    data_column_index,
                    "Invalid gossip data column"
                );
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::MidToleranceError,
                    "bad_gossip_data_column_ssz",
                );
            }
        }

        // If a block is in the da_checker, sync maybe awaiting for an event when block is finally
        // imported. A block can become imported both after processing a block or data column. If a
        // importing a block results in `Imported`, notify. Do not notify of data column errors.
        if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
            self.send_sync_message(SyncMessage::GossipBlockProcessResult {
                block_root,
                imported: true,
            });
        }
    }

    /// Process the beacon block received from the gossip network and:
    ///
    /// - If it passes gossip propagation criteria, tell the network thread to forward it.
    /// - Attempt to add it to the beacon chain, informing the sync thread if more blocks need to
    ///   be downloaded.
    ///
    /// Raises a log if there are errors.
    #[allow(clippy::too_many_arguments)]
    #[instrument(
        name = SPAN_PROCESS_GOSSIP_BLOCK,
        parent = None,
        level = "debug",
        skip_all,
        fields(block_root = tracing::field::Empty),
    )]
    pub async fn process_gossip_block(
        self: Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        peer_client: Client,
        block: Arc<SignedBeaconBlock<T::EthSpec>>,
        duplicate_cache: DuplicateCache,
        invalid_block_storage: InvalidBlockStorage,
        seen_duration: Duration,
    ) {
        if let Some(gossip_verified_block) = self
            .process_gossip_unverified_block(
                message_id,
                peer_id,
                peer_client,
                block.clone(),
                seen_duration,
            )
            .await
        {
            let block_root = gossip_verified_block.block_root;
            Span::current().record("block_root", block_root.to_string());

            if let Some(handle) = duplicate_cache.check_and_insert(block_root) {
                self.process_gossip_verified_block(
                    peer_id,
                    gossip_verified_block,
                    invalid_block_storage,
                    seen_duration,
                )
                .await;
                // Drop the handle to remove the entry from the cache
                drop(handle);
            } else {
                debug!(
                    %block_root,
                    "RPC block is being imported"
                );
            }
        }
    }

    /// Process the beacon block received from the gossip network and
    /// if it passes gossip propagation criteria, tell the network thread to forward it.
    ///
    /// Returns the `GossipVerifiedBlock` if verification passes and raises a log if there are errors.
    async fn process_gossip_unverified_block(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        peer_client: Client,
        block: Arc<SignedBeaconBlock<T::EthSpec>>,
        seen_duration: Duration,
    ) -> Option<GossipVerifiedBlock<T>> {
        let block_delay =
            get_block_delay_ms(seen_duration, block.message(), &self.chain.slot_clock);
        // Log metrics to track delay from other nodes on the network.

        let verification_result = self
            .chain
            .clone()
            .verify_block_for_gossip(block.clone())
            .await;

        if verification_result.is_ok() {
            metrics::set_gauge(
                &metrics::BEACON_BLOCK_DELAY_GOSSIP,
                block_delay.as_millis() as i64,
            );
        }

        let block_root = if let Ok(verified_block) = &verification_result {
            verified_block.block_root
        } else {
            block.canonical_root()
        };

        // Write the time the block was observed into delay cache.
        self.chain.block_times_cache.write().set_time_observed(
            block_root,
            block.slot(),
            seen_duration,
            Some(peer_id.to_string()),
            Some(peer_client.to_string()),
        );

        let verified_block = match verification_result {
            Ok(verified_block) => {
                if block_delay >= self.chain.slot_clock.unagg_attestation_production_delay() {
                    metrics::inc_counter(&metrics::BEACON_BLOCK_DELAY_GOSSIP_ARRIVED_LATE_TOTAL);
                    debug!(
                        block_root = ?verified_block.block_root,
                        proposer_index = verified_block.block.message().proposer_index(),
                        slot = ?verified_block.block.slot(),
                        ?block_delay,
                        "Gossip block arrived late"
                    );
                }

                info!(
                    slot = %verified_block.block.slot(),
                    root = ?verified_block.block_root,
                    "New block received"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

                // Log metrics to keep track of propagation delay times.
                if let Some(duration) = SystemTime::now()
                    .duration_since(UNIX_EPOCH)
                    .ok()
                    .and_then(|now| now.checked_sub(seen_duration))
                {
                    metrics::set_gauge(
                        &metrics::BEACON_BLOCK_DELAY_GOSSIP_VERIFICATION,
                        duration.as_millis() as i64,
                    );
                }

                verified_block
            }
            Err(e @ BlockError::Slashable) => {
                warn!(
                    error = ?e,
                    "Received equivocating block from peer"
                );
                /* punish peer for submitting an equivocation, but not too harshly as honest peers may conceivably forward equivocating blocks to us from time to time */
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::MidToleranceError,
                    "gossip_block_mid",
                );
                return None;
            }
            Err(BlockError::ParentUnknown { .. }) => {
                debug!(?block_root, "Unknown parent for gossip block");
                self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root));
                return None;
            }
            Err(e @ BlockError::BeaconChainError(_)) => {
                debug!(
                    error = ?e,
                    "Gossip block beacon chain error"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return None;
            }
            Err(
                BlockError::DuplicateFullyImported(_)
                | BlockError::DuplicateImportStatusUnknown(..),
            ) => {
                debug!(
                    %block_root,
                    "Gossip block is already known"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return None;
            }
            Err(e @ BlockError::FutureSlot { .. }) => {
                debug!(
                    error = %e,
                    "Could not verify block for gossip. Ignoring the block"
                );
                // Prevent recurring behaviour by penalizing the peer slightly.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "gossip_block_high",
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return None;
            }
            Err(e @ BlockError::WouldRevertFinalizedSlot { .. })
            | Err(e @ BlockError::NotFinalizedDescendant { .. }) => {
                debug!(
                    error = %e,
                    "Could not verify block for gossip. Ignoring the block"
                );
                // The spec says we must IGNORE these blocks but there's no reason for an honest
                // and non-buggy client to be gossiping blocks that blatantly conflict with
                // finalization. Old versions of Erigon/Caplin are known to gossip pre-finalization
                // blocks and we want to isolate them to encourage an update.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "gossip_block_low",
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return None;
            }
            Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
                debug!(error = %e, "Could not verify block for gossip. Ignoring the block");
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return None;
            }
            Err(e @ BlockError::StateRootMismatch { .. })
            | Err(e @ BlockError::IncorrectBlockProposer { .. })
            | Err(e @ BlockError::BlockSlotLimitReached)
            | Err(e @ BlockError::NonLinearSlots)
            | Err(e @ BlockError::UnknownValidator(_))
            | Err(e @ BlockError::PerBlockProcessingError(_))
            | Err(e @ BlockError::NonLinearParentRoots)
            | Err(e @ BlockError::BlockIsNotLaterThanParent { .. })
            | Err(e @ BlockError::InvalidSignature(_))
            | Err(e @ BlockError::WeakSubjectivityConflict)
            | Err(e @ BlockError::InconsistentFork(_))
            | Err(e @ BlockError::ExecutionPayloadError(_))
            | Err(e @ BlockError::ParentExecutionPayloadInvalid { .. })
            | Err(e @ BlockError::KnownInvalidExecutionPayload(_))
            | Err(e @ BlockError::GenesisBlock)
            | Err(e @ BlockError::InvalidBlobCount { .. }) => {
                warn!(error = %e, "Could not verify block for gossip. Rejecting the block");
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "gossip_block_low",
                );
                return None;
            }
            // Note: This error variant cannot be reached when doing gossip validation
            // as we do not do availability checks here.
            Err(e @ BlockError::AvailabilityCheck(_)) => {
                crit!(error = %e, "Internal block gossip validation error. Availability check during gossip validation");
                return None;
            }
            // BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
            Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => {
                error!(error = %e, "Internal block gossip validation error");
                return None;
            }
        };

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);

        // Register the block with any monitored validators.
        //
        // Run this event *prior* to importing the block, where the block is only partially
        // verified.
        self.chain.validator_monitor.read().register_gossip_block(
            seen_duration,
            verified_block.block.message(),
            verified_block.block_root,
            &self.chain.slot_clock,
        );

        let block_slot = verified_block.block.slot();
        let block_root = verified_block.block_root;

        // Try read the current slot to determine if this block should be imported now or after some
        // delay.
        match self.chain.slot() {
            // We only need to do a simple check about the block slot and the current slot since the
            // `verify_block_for_gossip` function already ensures that the block is within the
            // tolerance for block imports.
            Ok(current_slot) if block_slot > current_slot => {
                warn!(
                    %block_slot,
                    ?block_root,
                    msg = "if this happens consistently, check system clock",
                    "Block arrived early"
                );

                // Take note of how early this block arrived.
                if let Some(duration) = self
                    .chain
                    .slot_clock
                    .start_of(block_slot)
                    .and_then(|start| start.checked_sub(seen_duration))
                {
                    metrics::observe_duration(
                        &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_EARLY_SECONDS,
                        duration,
                    );
                }

                metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_REQUEUED_TOTAL);

                let inner_self = self.clone();
                let process_fn = Box::pin(async move {
                    let invalid_block_storage = inner_self.invalid_block_storage.clone();
                    inner_self
                        .process_gossip_verified_block(
                            peer_id,
                            verified_block,
                            invalid_block_storage,
                            seen_duration,
                        )
                        .await;
                });
                if self
                    .beacon_processor_send
                    .try_send(WorkEvent {
                        drop_during_sync: false,
                        work: Work::Reprocess(ReprocessQueueMessage::EarlyBlock(
                            QueuedGossipBlock {
                                beacon_block_slot: block_slot,
                                beacon_block_root: block_root,
                                process_fn,
                            },
                        )),
                    })
                    .is_err()
                {
                    error!(
                        %block_slot,
                        ?block_root,
                        location = "block gossip",
                        "Failed to defer block import"
                    )
                }
                None
            }
            Ok(_) => Some(verified_block),
            Err(e) => {
                error!(
                    error = ?e,
                    %block_slot,
                    ?block_root,
                    location = "block gossip",
                    "Failed to defer block import"
                );
                None
            }
        }
    }

    /// Process the beacon block that has already passed gossip verification.
    ///
    /// Raises a log if there are errors.
    #[instrument(skip_all)]
    async fn process_gossip_verified_block(
        self: Arc<Self>,
        peer_id: PeerId,
        verified_block: GossipVerifiedBlock<T>,
        invalid_block_storage: InvalidBlockStorage,
        _seen_duration: Duration,
    ) {
        let processing_start_time = Instant::now();
        let block = verified_block.block.block_cloned();
        let block_root = verified_block.block_root;

        // Block is gossip valid. Attempt to fetch blobs from the EL using versioned hashes derived
        // from kzg commitments, without having to wait for all blobs to be sent from the peers.
        let publish_blobs = true;
        let self_clone = self.clone();
        let block_clone = block.clone();
        let current_span = Span::current();
        self.executor.spawn(
            async move {
                self_clone
                    .fetch_engine_blobs_and_publish(block_clone, block_root, publish_blobs)
                    .await
            }
            .instrument(current_span),
            "fetch_blobs_gossip",
        );

        let result = self
            .chain
            .process_block(
                block_root,
                verified_block,
                NotifyExecutionLayer::Yes,
                BlockImportSource::Gossip,
                || Ok(()),
            )
            .await;
        register_process_result_metrics(&result, metrics::BlockSource::Gossip, "block");

        match &result {
            Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
                if self
                    .beacon_processor_send
                    .try_send(WorkEvent {
                        drop_during_sync: false,
                        work: Work::Reprocess(ReprocessQueueMessage::BlockImported {
                            block_root: *block_root,
                            parent_root: block.message().parent_root(),
                        }),
                    })
                    .is_err()
                {
                    error!(
                        source = "gossip",
                        ?block_root,
                        "Failed to inform block import"
                    )
                };

                debug!(
                    ?block_root,
                    %peer_id,
                    "Gossipsub block processed"
                );

                self.chain.recompute_head_at_current_slot().await;

                metrics::set_gauge(
                    &metrics::BEACON_BLOCK_DELAY_FULL_VERIFICATION,
                    processing_start_time.elapsed().as_millis() as i64,
                );
            }
            Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
                trace!(
                    %slot,
                    %block_root,
                    "Processed block, waiting for other components"
                );
            }
            Err(BlockError::ParentUnknown { .. }) => {
                // This should not occur. It should be checked by `should_forward_block`.
                // Do not send sync message UnknownParentBlock to prevent conflicts with the
                // BlockComponentProcessed message below. If this error ever happens, lookup sync
                // can recover by receiving another block / blob / attestation referencing the
                // chain that includes this block.
                error!(
                    %block_root,
                    %peer_id,
                    "Block with unknown parent attempted to be processed"
                );
            }
            Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => {
                debug!(
                    error = %e,
                    "Failed to verify execution payload"
                );
            }
            Err(BlockError::AvailabilityCheck(err)) => {
                match err.category() {
                    AvailabilityCheckErrorCategory::Internal => {
                        warn!(
                            error = ?err,
                            "Internal availability check error"
                        );
                    }
                    AvailabilityCheckErrorCategory::Malicious => {
                        // Note: we cannot penalize the peer that sent us the block
                        // over gossip here because these errors imply either an issue
                        // with:
                        // 1. Blobs we have received over non-gossip sources
                        //    (from potentially other peers)
                        // 2. The proposer being malicious and sending inconsistent
                        //    blocks and blobs.
                        warn!(
                            error = ?err,
                            "Received invalid blob or malicious proposer"
                        );
                    }
                }
            }
            other => {
                debug!(
                    outcome = ?other,
                    ?block_root,
                    block_slot = %block.slot(),
                    "Invalid gossip beacon block"
                );
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::MidToleranceError,
                    "bad_gossip_block_ssz",
                );
                trace!(
                    ssz = format_args!("0x{}", hex::encode(block.as_ssz_bytes())),
                    "Invalid gossip beacon block ssz"
                );
            }
        };

        if let Err(e) = &result {
            self.maybe_store_invalid_block(&invalid_block_storage, block_root, &block, e);
        }

        self.send_sync_message(SyncMessage::GossipBlockProcessResult {
            block_root,
            imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))),
        });
    }

    pub fn process_gossip_voluntary_exit(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        voluntary_exit: SignedVoluntaryExit,
    ) {
        let validator_index = voluntary_exit.message.validator_index;

        let exit = match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) {
            Ok(ObservationOutcome::New(exit)) => exit,
            Ok(ObservationOutcome::AlreadyKnown) => {
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                debug!(
                    validator_index,
                    peer = %peer_id,
                    "Dropping exit for already exiting validator"
                );
                return;
            }
            Err(e) => {
                debug!(
                    validator_index,
                    %peer_id,
                    error = ?e,
                    "Dropping invalid exit"
                );
                // These errors occur due to a fault in the beacon chain. It is not necessarily
                // the fault on the peer.
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                // We still penalize a peer slightly to prevent overuse of invalids.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "invalid_gossip_exit",
                );
                return;
            }
        };

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL);

        self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

        // Register the exit with any monitored validators.
        self.chain
            .validator_monitor
            .read()
            .register_gossip_voluntary_exit(&exit.as_inner().message);

        self.chain.import_voluntary_exit(exit);

        debug!("Successfully imported voluntary exit");

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_IMPORTED_TOTAL);
    }

    pub fn process_gossip_proposer_slashing(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        proposer_slashing: ProposerSlashing,
    ) {
        let validator_index = proposer_slashing.signed_header_1.message.proposer_index;

        let slashing = match self
            .chain
            .verify_proposer_slashing_for_gossip(proposer_slashing)
        {
            Ok(ObservationOutcome::New(slashing)) => slashing,
            Ok(ObservationOutcome::AlreadyKnown) => {
                debug!(
                    reason = "Already seen a proposer slashing for that validator",
                    validator_index,
                    peer = %peer_id,
                    "Dropping proposer slashing"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return;
            }
            Err(e) => {
                // This is likely a fault with the beacon chain and not necessarily a
                // malicious message from the peer.
                debug!(
                    validator_index,
                    %peer_id,
                    error = ?e,
                    "Dropping invalid proposer slashing"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);

                // Penalize peer slightly for invalids.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "invalid_gossip_proposer_slashing",
                );
                return;
            }
        };

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);

        self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

        // Register the slashing with any monitored validators.
        self.chain
            .validator_monitor
            .read()
            .register_gossip_proposer_slashing(slashing.as_inner());

        self.chain.import_proposer_slashing(slashing);
        debug!("Successfully imported proposer slashing");

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL);
    }

    pub fn process_gossip_attester_slashing(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        attester_slashing: AttesterSlashing<T::EthSpec>,
    ) {
        let slashing = match self
            .chain
            .verify_attester_slashing_for_gossip(attester_slashing)
        {
            Ok(ObservationOutcome::New(slashing)) => slashing,
            Ok(ObservationOutcome::AlreadyKnown) => {
                debug!(
                    reason = "Slashings already known for all slashed validators",
                    peer = %peer_id,
                    "Dropping attester slashing"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return;
            }
            Err(e) => {
                debug!(
                    %peer_id,
                    error = ?e,
                    "Dropping invalid attester slashing"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                // Penalize peer slightly for invalids.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "invalid_gossip_attester_slashing",
                );
                return;
            }
        };

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);

        self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

        // Register the slashing with any monitored validators.
        self.chain
            .validator_monitor
            .read()
            .register_gossip_attester_slashing(slashing.as_inner().to_ref());

        self.chain.import_attester_slashing(slashing);
        debug!("Successfully imported attester slashing");
        metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL);
    }

    pub fn process_gossip_bls_to_execution_change(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        bls_to_execution_change: SignedBlsToExecutionChange,
    ) {
        let validator_index = bls_to_execution_change.message.validator_index;
        let address = bls_to_execution_change.message.to_execution_address;

        let change = match self
            .chain
            .verify_bls_to_execution_change_for_gossip(bls_to_execution_change)
        {
            Ok(ObservationOutcome::New(change)) => change,
            Ok(ObservationOutcome::AlreadyKnown) => {
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                debug!(
                    validator_index,
                    peer = %peer_id,
                    "Dropping BLS to execution change"
                );
                return;
            }
            Err(e) => {
                debug!(
                    validator_index,
                    %peer_id,
                    error = ?e,
                    "Dropping invalid BLS to execution change"
                );
                // We ignore pre-capella messages without penalizing peers.
                if matches!(e, BeaconChainError::BlsToExecutionPriorToCapella) {
                    self.propagate_validation_result(
                        message_id,
                        peer_id,
                        MessageAcceptance::Ignore,
                    );
                } else {
                    // We penalize the peer slightly to prevent overuse of invalids.
                    self.propagate_validation_result(
                        message_id,
                        peer_id,
                        MessageAcceptance::Reject,
                    );
                    self.gossip_penalize_peer(
                        peer_id,
                        PeerAction::HighToleranceError,
                        "invalid_bls_to_execution_change",
                    );
                }
                return;
            }
        };

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_VERIFIED_TOTAL);

        self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

        // Address change messages from gossip are only processed *after* the
        // Capella fork epoch.
        let received_pre_capella = ReceivedPreCapella::No;

        self.chain
            .import_bls_to_execution_change(change, received_pre_capella);

        debug!(
            validator_index,
            ?address,
            "Successfully imported BLS to execution change"
        );

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_IMPORTED_TOTAL);
    }

    /// Process the sync committee signature received from the gossip network and:
    ///
    /// - If it passes gossip propagation criteria, tell the network thread to forward it.
    /// - Attempt to add it to the naive aggregation pool.
    ///
    /// Raises a log if there are errors.
    pub fn process_gossip_sync_committee_signature(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        sync_signature: SyncCommitteeMessage,
        subnet_id: SyncSubnetId,
        seen_timestamp: Duration,
    ) {
        let message_slot = sync_signature.slot;
        let sync_signature = match self
            .chain
            .verify_sync_committee_message_for_gossip(sync_signature, subnet_id)
        {
            Ok(sync_signature) => sync_signature,
            Err(e) => {
                self.handle_sync_committee_message_failure(
                    peer_id,
                    message_id,
                    "sync_signature",
                    e,
                    message_slot,
                    seen_timestamp,
                );
                return;
            }
        };

        // If the message is still timely, propagate it.
        self.propagate_sync_message_if_timely(message_slot, message_id, peer_id);

        // Register the sync signature with any monitored validators.
        self.chain
            .validator_monitor
            .read()
            .register_gossip_sync_committee_message(
                seen_timestamp,
                sync_signature.sync_message(),
                &self.chain.slot_clock,
            );

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_MESSAGE_VERIFIED_TOTAL);

        if let Err(e) = self
            .chain
            .add_to_naive_sync_aggregation_pool(sync_signature)
        {
            debug!(
                reason = ?e,
                %peer_id,
                "Sync committee signature invalid for agg pool"
            )
        }

        metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_MESSAGE_IMPORTED_TOTAL);
    }

    /// Process the sync committee contribution received from the gossip network and:
    ///
    /// - If it passes gossip propagation criteria, tell the network thread to forward it.
    /// - Attempt to add it to the block inclusion pool.
    ///
    /// Raises a log if there are errors.
    pub fn process_sync_committee_contribution(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        sync_contribution: SignedContributionAndProof<T::EthSpec>,
        seen_timestamp: Duration,
    ) {
        let contribution_slot = sync_contribution.message.contribution.slot;
        let sync_contribution = match self
            .chain
            .verify_sync_contribution_for_gossip(sync_contribution)
        {
            Ok(sync_contribution) => sync_contribution,
            Err(e) => {
                // Report the failure to gossipsub
                self.handle_sync_committee_message_failure(
                    peer_id,
                    message_id,
                    "sync_contribution",
                    e,
                    contribution_slot,
                    seen_timestamp,
                );
                return;
            }
        };

        // If the message is still timely, propagate it.
        self.propagate_sync_message_if_timely(contribution_slot, message_id, peer_id);

        self.chain
            .validator_monitor
            .read()
            .register_gossip_sync_committee_contribution(
                seen_timestamp,
                sync_contribution.aggregate(),
                sync_contribution.participant_pubkeys(),
                &self.chain.slot_clock,
            );
        metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_VERIFIED_TOTAL);

        if let Err(e) = self
            .chain
            .add_contribution_to_block_inclusion_pool(sync_contribution)
        {
            debug!(
                reason = ?e,
                %peer_id,
                "Sync contribution invalid for op pool"
            )
        }
        metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_IMPORTED_TOTAL);
    }

    pub fn process_gossip_finality_update(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        light_client_finality_update: LightClientFinalityUpdate<T::EthSpec>,
        seen_timestamp: Duration,
    ) {
        match self
            .chain
            .verify_finality_update_for_gossip(light_client_finality_update, seen_timestamp)
        {
            Ok(_verified_light_client_finality_update) => {
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
            }
            Err(e) => {
                metrics::register_finality_update_error(&e);
                match e {
                    LightClientFinalityUpdateError::MismatchedSignatureSlot { .. }
                    | LightClientFinalityUpdateError::MismatchedAttestedHeader { .. }
                    | LightClientFinalityUpdateError::MismatchedFinalizedHeader { .. }
                    | LightClientFinalityUpdateError::MismatchedProofOrSyncAggregate { .. } => {
                        debug!(
                            %peer_id,
                            error = ?e,
                            "Light client invalid finality update"
                        );

                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::HighToleranceError,
                            "light_client_gossip_error",
                        );
                    }
                    LightClientFinalityUpdateError::TooEarly => {
                        debug!(
                            %peer_id,
                            error = ?e,
                            "Light client finality update too early"
                        );

                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::HighToleranceError,
                            "light_client_gossip_error",
                        );
                    }
                    LightClientFinalityUpdateError::SigSlotStartIsNone
                    | LightClientFinalityUpdateError::FailedConstructingUpdate => debug!(
                        %peer_id,
                        error = ?e,
                        "Light client error constructing finality update"
                    ),
                    LightClientFinalityUpdateError::Ignore => {}
                }
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
            }
        };
    }

    pub fn process_gossip_optimistic_update(
        self: &Arc<Self>,
        message_id: MessageId,
        peer_id: PeerId,
        light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
        allow_reprocess: bool,
        seen_timestamp: Duration,
    ) {
        match self.chain.verify_optimistic_update_for_gossip(
            light_client_optimistic_update.clone(),
            seen_timestamp,
        ) {
            Ok(verified_light_client_optimistic_update) => {
                debug!(
                    %peer_id,
                    parent_root = %verified_light_client_optimistic_update.parent_root,
                    "Light client successful optimistic update"
                );

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
            }
            Err(e) => {
                match e {
                    LightClientOptimisticUpdateError::UnknownBlockParentRoot(parent_root) => {
                        metrics::inc_counter(
                            &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES,
                        );
                        debug!(
                            %peer_id,
                            ?parent_root,
                            "Optimistic update for unknown block"
                        );

                        if allow_reprocess {
                            let processor = self.clone();
                            let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(
                                QueuedLightClientUpdate {
                                    parent_root,
                                    process_fn: Box::new(move || {
                                        processor.process_gossip_optimistic_update(
                                            message_id,
                                            peer_id,
                                            light_client_optimistic_update,
                                            false, // Do not reprocess this message again.
                                            seen_timestamp,
                                        )
                                    }),
                                },
                            );

                            if self
                                .beacon_processor_send
                                .try_send(WorkEvent {
                                    drop_during_sync: true,
                                    work: Work::Reprocess(msg),
                                })
                                .is_err()
                            {
                                error!("Failed to send optimistic update for re-processing")
                            }
                        } else {
                            debug!(
                                %peer_id,
                                ?parent_root,
                                "Not sending light client update because it had been reprocessed"
                            );

                            self.propagate_validation_result(
                                message_id,
                                peer_id,
                                MessageAcceptance::Ignore,
                            );
                        }
                        return;
                    }
                    LightClientOptimisticUpdateError::MismatchedSignatureSlot { .. }
                    | LightClientOptimisticUpdateError::MismatchedAttestedHeader { .. }
                    | LightClientOptimisticUpdateError::MismatchedSyncAggregate { .. } => {
                        metrics::register_optimistic_update_error(&e);

                        debug!(
                            %peer_id,
                            error = ?e,
                            "Light client invalid optimistic update"
                        );

                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::HighToleranceError,
                            "light_client_gossip_error",
                        )
                    }
                    LightClientOptimisticUpdateError::TooEarly => {
                        metrics::register_optimistic_update_error(&e);
                        debug!(
                            %peer_id,
                            error = ?e,
                            "Light client optimistic update too early"
                        );

                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::HighToleranceError,
                            "light_client_gossip_error",
                        );
                    }
                    LightClientOptimisticUpdateError::SigSlotStartIsNone
                    | LightClientOptimisticUpdateError::FailedConstructingUpdate => {
                        metrics::register_optimistic_update_error(&e);

                        debug!(
                            %peer_id,
                            error = ?e,
                            "Light client error constructing optimistic update"
                        )
                    }
                    LightClientOptimisticUpdateError::Ignore => {}
                }
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
            }
        };
    }

    /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
    /// network.
    fn handle_attestation_verification_failure(
        self: &Arc<Self>,
        peer_id: PeerId,
        message_id: MessageId,
        failed_att: FailedAtt<T::EthSpec>,
        allow_reprocess: bool,
        error: AttnError,
        seen_timestamp: Duration,
    ) {
        let beacon_block_root = failed_att.beacon_block_root();
        let attestation_type = failed_att.kind();
        metrics::register_attestation_error(&error);
        match &error {
            AttnError::FutureSlot { .. } => {
                /*
                 * These errors can be triggered by a mismatch between our slot and the peer.
                 *
                 *
                 * The peer has published an invalid consensus message, _only_ if we trust our own clock.
                 */
                trace!(
                    %peer_id,
                    block = ?beacon_block_root,
                    ?attestation_type,
                    "Attestation is not within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots"
                );

                // Peers that are slow or not to spec can spam us with these messages draining our
                // bandwidth. We therefore penalize these peers when they do this.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_future_slot",
                );

                // Do not propagate these messages.
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
            }
            AttnError::PastSlot { .. } => {
                // Produce a slot clock frozen at the time we received the message from the
                // network.
                let seen_clock = &self.chain.slot_clock.freeze_at(seen_timestamp);
                let hindsight_verification =
                    attestation_verification::verify_propagation_slot_range::<_, T::EthSpec>(
                        seen_clock,
                        failed_att.attestation_data(),
                        &self.chain.spec,
                    );

                // Only penalize the peer if it would have been invalid at the moment we received
                // it.
                if STRICT_LATE_MESSAGE_PENALTIES && hindsight_verification.is_err() {
                    self.gossip_penalize_peer(
                        peer_id,
                        PeerAction::LowToleranceError,
                        "attn_past_slot",
                    );
                }

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
            }
            AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
                /*
                 * These errors are caused by invalid signatures.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_selection_proof",
                );
            }
            AttnError::EmptyAggregationBitfield => {
                /*
                 * The aggregate had no signatures and is therefore worthless.
                 *
                 * This is forbidden by the p2p spec. Reject the message.
                 *
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_empty_agg_bitfield",
                );
            }
            AttnError::AggregatorPubkeyUnknown(_) => {
                /*
                 * The aggregator index was higher than any known validator index. This is
                 * possible in two cases:
                 *
                 * 1. The attestation is malformed
                 * 2. The attestation attests to a beacon_block_root that we do not know.
                 *
                 * It should be impossible to reach (2) without triggering
                 * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
                 * faulty.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_agg_pubkey",
                );
            }
            AttnError::AggregatorNotInCommittee { .. } => {
                /*
                 * The aggregator index was higher than any known validator index. This is
                 * possible in two cases:
                 *
                 * 1. The attestation is malformed
                 * 2. The attestation attests to a beacon_block_root that we do not know.
                 *
                 * It should be impossible to reach (2) without triggering
                 * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is
                 * faulty.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_agg_not_in_committee",
                );
            }
            AttnError::AttesterNotInCommittee { .. } => {
                /*
                 * `SingleAttestation` from a validator is invalid because the `attester_index` is
                 * not in the claimed committee. There is no reason a non-faulty validator would
                 * send this message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_single_not_in_committee",
                );
            }
            AttnError::AttestationSupersetKnown { .. } => {
                /*
                 * The aggregate attestation has already been observed on the network or in
                 * a block.
                 *
                 * The peer is not necessarily faulty.
                 */
                trace!(
                    %peer_id,
                    block = ?beacon_block_root,
                    ?attestation_type,
                    "Attestation already known"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return;
            }
            AttnError::AggregatorAlreadyKnown(_) => {
                /*
                 * There has already been an aggregate attestation seen from this
                 * aggregator index.
                 *
                 * The peer is not necessarily faulty.
                 */
                trace!(
                    %peer_id,
                    block = ?beacon_block_root,
                    ?attestation_type,
                    "Aggregator already known"
                );
                // This is an allowed behaviour.
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);

                return;
            }
            AttnError::PriorAttestationKnown {
                validator_index,
                epoch,
            } => {
                /*
                 * We have already seen an attestation from this validator for this epoch.
                 *
                 * The peer is not necessarily faulty.
                 */
                debug!(
                    %peer_id,
                    block = ?beacon_block_root,
                    %epoch,
                    validator_index,
                    ?attestation_type,
                    "Prior attestation known"
                );

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);

                return;
            }
            AttnError::ValidatorIndexTooHigh(_) => {
                /*
                 * The aggregator index (or similar field) was higher than the maximum
                 * possible number of validators.
                 *
                 * The peer has published an invalid consensus message.
                 */
                debug!(
                    %peer_id,
                    block = ?beacon_block_root,
                    ?attestation_type,
                    "Validation Index too high"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_val_index_too_high",
                );
            }
            AttnError::CommitteeIndexNonZero(index) => {
                /*
                 * The validator index is not set to zero after Electra.
                 *
                 * The peer has published an invalid consensus message.
                 */
                debug!(
                    %peer_id,
                    block = ?beacon_block_root,
                    ?attestation_type,
                    committee_index = index,
                    "Committee index non zero"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_comm_index_non_zero",
                );
            }
            AttnError::UnknownHeadBlock { beacon_block_root } => {
                trace!(
                    %peer_id,
                    block = ?beacon_block_root,
                    "Attestation for unknown block"
                );
                if allow_reprocess {
                    // We don't know the block, get the sync manager to handle the block lookup, and
                    // send the attestation to be scheduled for re-processing.
                    self.sync_tx
                        .send(SyncMessage::UnknownBlockHashFromAttestation(
                            peer_id,
                            *beacon_block_root,
                        ))
                        .unwrap_or_else(|_| {
                            warn!(msg = "UnknownBlockHash", "Failed to send to sync service")
                        });
                    let msg = match failed_att {
                        FailedAtt::Aggregate {
                            attestation,
                            seen_timestamp,
                        } => {
                            metrics::inc_counter(
                                &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_REQUEUED_TOTAL,
                            );
                            let processor = self.clone();
                            ReprocessQueueMessage::UnknownBlockAggregate(QueuedAggregate {
                                beacon_block_root: *beacon_block_root,
                                process_fn: Box::new(move || {
                                    processor.process_gossip_aggregate(
                                        message_id,
                                        peer_id,
                                        attestation,
                                        false, // Do not allow this attestation to be re-processed beyond this point.
                                        seen_timestamp,
                                    )
                                }),
                            })
                        }
                        FailedAtt::Unaggregate {
                            attestation,
                            subnet_id,
                            should_import,
                            seen_timestamp,
                        } => {
                            metrics::inc_counter(
                                &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL,
                            );
                            let processor = self.clone();
                            ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate {
                                beacon_block_root: *beacon_block_root,
                                process_fn: Box::new(move || {
                                    processor.process_gossip_attestation(
                                        message_id,
                                        peer_id,
                                        attestation,
                                        subnet_id,
                                        should_import,
                                        false, // Do not allow this attestation to be re-processed beyond this point.
                                        seen_timestamp,
                                    )
                                }),
                            })
                        }
                    };

                    if self
                        .beacon_processor_send
                        .try_send(WorkEvent {
                            drop_during_sync: false,
                            work: Work::Reprocess(msg),
                        })
                        .is_err()
                    {
                        error!("Failed to send attestation for re-processing")
                    }
                } else {
                    // We shouldn't make any further attempts to process this attestation.
                    //
                    // Don't downscore the peer since it's not clear if we requested this head
                    // block from them or not.
                    self.propagate_validation_result(
                        message_id,
                        peer_id,
                        MessageAcceptance::Ignore,
                    );
                }

                return;
            }
            AttnError::UnknownTargetRoot(_) => {
                /*
                 * The block indicated by the target root is not known to us.
                 *
                 * We should always get `AttnError::UnknownHeadBlock` before we get this
                 * error, so this means we can get this error if:
                 *
                 * 1. The target root does not represent a valid block.
                 * 2. We do not have the target root in our DB.
                 *
                 * For (2), we should only be processing attestations when we should have
                 * all the available information. Note: if we do a weak-subjectivity sync
                 * it's possible that this situation could occur, but I think it's
                 * unlikely. For now, we will declare this to be an invalid message.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_unknown_target",
                );
            }
            AttnError::BadTargetEpoch => {
                /*
                 * The aggregator index (or similar field) was higher than the maximum
                 * possible number of validators.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_bad_target",
                );
            }
            AttnError::NoCommitteeForSlotAndIndex { .. } => {
                /*
                 * It is not possible to attest this the given committee in the given slot.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_no_committee",
                );
            }
            AttnError::NotExactlyOneCommitteeBitSet(_) => {
                /*
                 * The attestation doesn't have only one committee bit set.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_too_many_comm_bits",
                );
            }
            AttnError::AttestsToFutureBlock { .. } => {
                /*
                 * The beacon_block_root is from a higher slot than the attestation.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_future_block",
                );
            }
            AttnError::InvalidSubnetId { received, expected } => {
                /*
                 * The attestation was received on an incorrect subnet id.
                 */
                debug!(
                    ?expected,
                    ?received,
                    "Received attestation on incorrect subnet"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_invalid_subnet_id",
                );
            }
            AttnError::Invalid(_) => {
                /*
                 * The attestation failed the state_processing verification.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_invalid_state_processing",
                );
            }
            AttnError::InvalidTargetEpoch { .. } => {
                /*
                 * The attestation is malformed.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_invalid_target_epoch",
                );
            }
            AttnError::InvalidTargetRoot { .. } => {
                /*
                 * The attestation is malformed.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "attn_invalid_target_root",
                );
            }
            AttnError::TooManySkippedSlots {
                head_block_slot,
                attestation_slot,
            } => {
                /*
                 * The attestation references a head block that is too far behind the attestation slot.
                 *
                 * The message is not necessarily invalid, but we choose to ignore it.
                 */
                debug!(
                    %head_block_slot,
                    %attestation_slot,
                    "Rejected long skip slot attestation"
                );
                // In this case we wish to penalize gossipsub peers that do this to avoid future
                // attestations that have too many skip slots.
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::MidToleranceError,
                    "attn_too_many_skipped_slots",
                );
            }
            AttnError::HeadBlockFinalized { beacon_block_root } => {
                debug!(
                    block_root = ?beacon_block_root,
                    attestation_slot = %failed_att.attestation_data().slot,
                    "Ignored attestation to finalized block"
                );

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);

                // The peer that sent us this could be a lagger, or a spammer, or this failure could
                // be due to us processing attestations extremely slowly. Don't be too harsh.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "attn_to_finalized_block",
                );
            }
            AttnError::BeaconChainError(e) => {
                match e.as_ref() {
                    BeaconChainError::DBError(Error::HotColdDBError(
                        HotColdDBError::FinalizedStateNotInHotDatabase { .. },
                    )) => {
                        debug!(%peer_id, "Attestation for finalized state");
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Ignore,
                        );
                    }
                    BeaconChainError::MaxCommitteePromises(e) => {
                        debug!(
                            target_root = ?failed_att.attestation_data().target.root,
                            ?beacon_block_root,
                            slot = ?failed_att.attestation_data().slot,
                            ?attestation_type,
                            error = ?e,
                            %peer_id,
                            "Dropping attestation"
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Ignore,
                        );
                    }
                    BeaconChainError::AttestationValidationError(e) => {
                        // Failures from `get_attesting_indices` end up here.
                        debug!(
                            %peer_id,
                            block_root = ?beacon_block_root,
                            attestation_slot = %failed_att.attestation_data().slot,
                            error = ?e,
                            "Rejecting attestation that failed validation"
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Reject,
                        );
                        self.gossip_penalize_peer(
                            peer_id,
                            PeerAction::MidToleranceError,
                            "attn_validation_error",
                        );
                    }
                    _ => {
                        /*
                         * Lighthouse hit an unexpected error whilst processing the attestation. It
                         * should be impossible to trigger a `BeaconChainError` from the network,
                         * so we have a bug.
                         *
                         * It's not clear if the message is invalid/malicious.
                         */
                        error!(
                            ?beacon_block_root,
                            slot = ?failed_att.attestation_data().slot,
                            ?attestation_type,
                            %peer_id,
                            error = ?e,
                            "Unable to validate attestation"
                        );
                        self.propagate_validation_result(
                            message_id,
                            peer_id,
                            MessageAcceptance::Ignore,
                        );
                    }
                }
            }
        }

        debug!(
            reason = ?error,
            block = ?beacon_block_root,
            %peer_id,
            ?attestation_type,
            "Invalid attestation from network"
        );
    }

    /// Handle an error whilst verifying a `SyncCommitteeMessage` or `SignedContributionAndProof` from the
    /// network.
    pub fn handle_sync_committee_message_failure(
        &self,
        peer_id: PeerId,
        message_id: MessageId,
        message_type: &str,
        error: SyncCommitteeError,
        sync_committee_message_slot: Slot,
        seen_timestamp: Duration,
    ) {
        metrics::register_sync_committee_error(&error);

        match &error {
            SyncCommitteeError::FutureSlot { .. } => {
                /*
                 * This error can be triggered by a mismatch between our slot and the peer.
                 *
                 *
                 * The peer has published an invalid consensus message, _only_ if we trust our own clock.
                 */
                trace!(
                    %peer_id,
                    ?message_type,
                    "Sync committee message is not within the last MAXIMUM_GOSSIP_CLOCK_DISPARITY slots"
                );

                // Unlike attestations, we have a zero slot buffer in case of sync committee messages,
                // so we don't penalize heavily.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "sync_future_slot",
                );

                // Do not propagate these messages.
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
            }
            SyncCommitteeError::PastSlot { .. } => {
                /*
                 * This error can be triggered by a mismatch between our slot and the peer.
                 *
                 *
                 * The peer has published an invalid consensus message, _only_ if we trust our own clock.
                 */
                trace!(
                    %peer_id,
                    ?message_type,
                    "Sync committee message is not within the last MAXIMUM_GOSSIP_CLOCK_DISPARITY slots"
                );

                // Compute the slot when we received the message.
                let received_slot = self
                    .chain
                    .slot_clock
                    .slot_of(seen_timestamp)
                    .unwrap_or_else(|| self.chain.slot_clock.genesis_slot());

                // The message is "excessively" late if it was more than one slot late.
                let excessively_late = received_slot > sync_committee_message_slot + 1;

                // This closure will lazily produce a slot clock frozen at the time we received the
                // message from the network and return a bool indicating if the message was invalid
                // at the time of receipt too.
                let invalid_in_hindsight = || {
                    let seen_clock = &self.chain.slot_clock.freeze_at(seen_timestamp);
                    let hindsight_verification =
                        sync_committee_verification::verify_propagation_slot_range(
                            seen_clock,
                            &sync_committee_message_slot,
                            &self.chain.spec,
                        );
                    hindsight_verification.is_err()
                };

                // Penalize the peer if the message was more than one slot late
                if STRICT_LATE_MESSAGE_PENALTIES && excessively_late && invalid_in_hindsight() {
                    self.gossip_penalize_peer(
                        peer_id,
                        PeerAction::HighToleranceError,
                        "sync_past_slot",
                    );
                }

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
            }
            SyncCommitteeError::EmptyAggregationBitfield => {
                /*
                 * The aggregate had no signatures and is therefore worthless.
                 *
                 * This is forbidden by the p2p spec. Reject the message.
                 *
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_empty_agg_bitfield",
                );
            }
            SyncCommitteeError::InvalidSelectionProof { .. }
            | SyncCommitteeError::InvalidSignature => {
                /*
                 * These errors are caused by invalid signatures.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_invalid_proof_or_sig",
                );
            }
            SyncCommitteeError::AggregatorNotInCommittee { .. }
            | SyncCommitteeError::AggregatorPubkeyUnknown(_) => {
                /*
                * The aggregator is not in the committee for the given `ContributionAndSync` OR
                  The aggregator index was higher than any known validator index
                *
                * The peer has published an invalid consensus message.
                */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_bad_aggregator",
                );
            }
            SyncCommitteeError::SyncContributionSupersetKnown(_)
            | SyncCommitteeError::AggregatorAlreadyKnown(_) => {
                /*
                 * The sync committee message already been observed on the network or in
                 * a block.
                 *
                 * The peer is not necessarily faulty.
                 */
                trace!(
                    %peer_id,
                    ?message_type,
                    "Sync committee message is already known"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                return;
            }
            SyncCommitteeError::UnknownValidatorIndex(_) => {
                /*
                 * The aggregator index (or similar field) was higher than the maximum
                 * possible number of validators.
                 *
                 * The peer has published an invalid consensus message.
                 */
                debug!(
                    %peer_id,
                    ?message_type,
                    "Validation Index too high"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_unknown_validator",
                );
            }
            SyncCommitteeError::UnknownValidatorPubkey(_) => {
                debug!(
                    %peer_id,
                    ?message_type,
                    "Validator pubkey is unknown"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_unknown_validator_pubkey",
                );
            }
            SyncCommitteeError::InvalidSubnetId { received, expected } => {
                /*
                 * The sync committee message was received on an incorrect subnet id.
                 */
                debug!(
                    ?expected,
                    ?received,
                    "Received sync committee message on incorrect subnet"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_invalid_subnet_id",
                );
            }
            SyncCommitteeError::Invalid(_) => {
                /*
                 * The sync committee message failed the state_processing verification.
                 *
                 * The peer has published an invalid consensus message.
                 */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_invalid_state_processing",
                );
            }
            SyncCommitteeError::PriorSyncCommitteeMessageKnown { .. } => {
                /*
                 * We have already seen a sync committee message from this validator for this epoch.
                 *
                 * The peer is not necessarily faulty.
                 */
                debug!(
                    %peer_id,
                    ?message_type,
                    "Prior sync committee message known"
                );

                // Do not penalize the peer.

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);

                return;
            }
            SyncCommitteeError::PriorSyncContributionMessageKnown { .. } => {
                /*
                 * We have already seen a sync contribution message from this validator for this epoch.
                 *
                 * The peer is not necessarily faulty.
                 */
                debug!(
                    %peer_id,
                    ?message_type,
                    "Prior sync contribution message known"
                );
                // We still penalize the peer slightly. We don't want this to be a recurring
                // behaviour.
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "sync_prior_known",
                );

                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);

                return;
            }
            SyncCommitteeError::BeaconChainError(e) => {
                /*
                 * Lighthouse hit an unexpected error whilst processing the sync committee message. It
                 * should be impossible to trigger a `BeaconChainError` from the network,
                 * so we have a bug.
                 *
                 * It's not clear if the message is invalid/malicious.
                 */
                error!(
                    %peer_id,
                    error = ?e,
                    "Unable to validate sync committee message"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
            }
            SyncCommitteeError::BeaconStateError(e) => {
                /*
                 * Lighthouse hit an unexpected error whilst processing the sync committee message. It
                 * should be impossible to trigger a `BeaconStateError` from the network,
                 * so we have a bug.
                 *
                 * It's not clear if the message is invalid/malicious.
                 */
                error!(
                    %peer_id,
                    error = ?e,
                    "Unable to validate sync committee message"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                // Penalize the peer slightly
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "sync_beacon_state_error",
                );
            }
            SyncCommitteeError::ContributionError(e) => {
                error!(
                    %peer_id,
                    error = ?e,
                    "Error while processing sync contribution"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                // Penalize the peer slightly
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "sync_contribution_error",
                );
            }
            SyncCommitteeError::SyncCommitteeError(e) => {
                error!(
                    %peer_id,
                    error = ?e,
                    "Error while processing sync committee message"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                // Penalize the peer slightly
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::HighToleranceError,
                    "sync_committee_error",
                );
            }
            SyncCommitteeError::ArithError(e) => {
                /*
                This would most likely imply incompatible configs or an invalid message.
                */
                error!(
                    %peer_id,
                    error = ?e,
                    "Arithematic error while processing sync committee message"
                );
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_arith_error",
                );
            }
            SyncCommitteeError::InvalidSubcommittee { .. } => {
                /*
                The subcommittee index is higher than `SYNC_COMMITTEE_SUBNET_COUNT`. This would imply
                an invalid message.
                */
                self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
                self.gossip_penalize_peer(
                    peer_id,
                    PeerAction::LowToleranceError,
                    "sync_invalid_subcommittee",
                );
            }
        }
        debug!(
            reason = ?error,
            %peer_id,
            ?message_type,
            "Invalid sync committee message from network"
        );
    }

    /// Propagate (accept) if `is_timely == true`, otherwise ignore.
    fn propagate_if_timely(&self, is_timely: bool, message_id: MessageId, peer_id: PeerId) {
        if is_timely {
            // The message is still relevant, propagate.
            self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
        } else {
            // The message is not relevant, ignore. It might be that this message became irrelevant
            // during the time it took to process it, or it was received invalid.
            self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
        }
    }

    /// If an attestation (agg. or unagg.) is still valid with respect to the current time (i.e.,
    /// timely), propagate it on gossip. Otherwise, ignore it.
    fn propagate_attestation_if_timely(
        &self,
        attestation: AttestationRef<T::EthSpec>,
        message_id: MessageId,
        peer_id: PeerId,
    ) {
        let is_timely = attestation_verification::verify_propagation_slot_range::<_, T::EthSpec>(
            &self.chain.slot_clock,
            attestation.data(),
            &self.chain.spec,
        )
        .is_ok();

        self.propagate_if_timely(is_timely, message_id, peer_id)
    }

    /// If a sync committee signature or sync committee contribution is still valid with respect to
    /// the current time (i.e., timely), propagate it on gossip. Otherwise, ignore it.
    fn propagate_sync_message_if_timely(
        &self,
        sync_message_slot: Slot,
        message_id: MessageId,
        peer_id: PeerId,
    ) {
        let is_timely = self
            .chain
            .slot_clock
            .now()
            .is_some_and(|current_slot| sync_message_slot == current_slot);

        self.propagate_if_timely(is_timely, message_id, peer_id)
    }

    /// Stores a block as a SSZ file, if and where `invalid_block_storage` dictates.
    fn maybe_store_invalid_block(
        &self,
        invalid_block_storage: &InvalidBlockStorage,
        block_root: Hash256,
        block: &SignedBeaconBlock<T::EthSpec>,
        error: &BlockError,
    ) {
        if let InvalidBlockStorage::Enabled(base_dir) = invalid_block_storage {
            let block_path = base_dir.join(format!("{}_{:?}.ssz", block.slot(), block_root));
            let error_path = base_dir.join(format!("{}_{:?}.error", block.slot(), block_root));

            let write_file = |path: PathBuf, bytes: &[u8]| {
                // No need to write the same file twice. For the error file,
                // this means that we'll remember the first error message but
                // forget the rest.
                if path.exists() {
                    return;
                }

                // Write to the file.
                let write_result = fs::OpenOptions::new()
                    // Only succeed if the file doesn't already exist. We should
                    // have checked for this earlier.
                    .create_new(true)
                    .write(true)
                    .open(&path)
                    .map_err(|e| format!("Failed to open file: {:?}", e))
                    .map(|mut file| {
                        file.write_all(bytes)
                            .map_err(|e| format!("Failed to write file: {:?}", e))
                    });
                if let Err(e) = write_result {
                    error!(
                        error = e,
                        ?path,
                        ?block_root,
                        slot = %block.slot(),
                        "Failed to store invalid block/error"
                    )
                } else {
                    info!(
                        ?path,
                        ?block_root,
                        slot = %block.slot(),
                        "Stored invalid block/error"
                    )
                }
            };

            write_file(block_path, &block.as_ssz_bytes());
            write_file(error_path, error.to_string().as_bytes());
        }
    }
}
